LOADING...

加载过慢请开启缓存(浏览器默认开启)

loading

RAGFlow执行流程解析canvas

2025/3/19 AI

Canvas.py

import logging
import json
from copy import deepcopy
from functools import partial

import pandas as pd

from agent.component import component_class
from agent.component.base import ComponentBase


class Canvas:
    """
    dsl = {
        "components": {
            "begin": {
                "obj":{
                    "component_name": "Begin",
                    "params": {},
                },
                "downstream": ["answer_0"],
                "upstream": [],
            },
            "answer_0": {
                "obj": {
                    "component_name": "Answer",
                    "params": {}
                },
                "downstream": ["retrieval_0"],
                "upstream": ["begin", "generate_0"],
            },
            "retrieval_0": {
                "obj": {
                    "component_name": "Retrieval",
                    "params": {}
                },
                "downstream": ["generate_0"],
                "upstream": ["answer_0"],
            },
            "generate_0": {
                "obj": {
                    "component_name": "Generate",
                    "params": {}
                },
                "downstream": ["answer_0"],
                "upstream": ["retrieval_0"],
            }
        },
        "history": [],
        "messages": [],
        "reference": [],
        "path": [["begin"]],
        "answer": []
    }
    """
    # dsl是组件流的JSON格式字符串 定义了组件的连接关系
    # tenant_id多租户管理
    # component存储所有组件实例
    def __init__(self, dsl: str, tenant_id=None):
        self.path = []
        self.history = []
        self.messages = []
        self.answer = []
        self.components = {}
        # 如果提供了dsl则解析JSON 否则使用默认的DSL结构(包含 Begin 组件)
        self.dsl = json.loads(dsl) if dsl else {
            "components": {
                "begin": {
                    "obj": {
                        "component_name": "Begin",
                        "params": {
                            "prologue": "Hi there!"
                        }
                    },
                    "downstream": [],
                    "upstream": [],
                    "parent_id": ""
                }
            },
            "history": [],
            "messages": [],
            "reference": [],
            "path": [],
            "answer": []
        }
        self._tenant_id = tenant_id
        self._embed_id = ""
        self.load()

    # 加载dsl
    def load(self):
      # 解析dsl["components"]存储组件实例 并检查必要组件是否存在
        # 组件字典存入self.components
        self.components = self.dsl["components"]
        # cpn_nms记录组件名称集合
        cpn_nms = set([])

        # 遍历组件 提取名称
        for k, cpn in self.components.items():
            cpn_nms.add(cpn["obj"]["component_name"])
        # 确保Begin和Answer组件存在 否则抛出错误
        assert "Begin" in cpn_nms, "There have to be an 'Begin' component."
        assert "Answer" in cpn_nms, "There have to be an 'Answer' component."

        # for循环便利self.components这个字典
        # 其中键k是组件标识符 cpn是一个包含组件详细信息的字典
        for k, cpn in self.components.items():
            # 将组件名称添加到cpn_nms集合中
            cpn_nms.add(cpn["obj"]["component_name"])
            # 通过component_class动态创建 拼接param
            param = component_class(cpn["obj"]["component_name"] + "Param")()
            # 更新param实例的属性 将cpn["obj"]["params"]字典中的键值对应用到param实例上
            param.update(cpn["obj"]["params"])
            param.check()
          # 使用component_class动态创建了一个新的组件实例 并将其赋值给cpn["obj"]
            # 新实例的创建使用了当前类实例self、组件标识符k和参数实例param
            cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)
            # 检查当前组件的名称是否为"Categorize"
            if cpn["obj"].component_name == "Categorize":
                # 组件是"Categorize" 则遍历param.category_description字典 这个字典包含了分类描述
                for _, desc in param.category_description.items():
                    # 这两行代码检查desc["to"](可能是下游组件的标识符)是否已经在cpn["downstream"]列表中
                    if desc["to"] not in cpn["downstream"]:
                        # 如果不在,就将其添加到列表中
                        cpn["downstream"].append(desc["to"])
        # 将self.dsl["path"]的值赋给self.path属性
        self.path = self.dsl["path"]
        self.history = self.dsl["history"]
        self.messages = self.dsl["messages"]
        self.answer = self.dsl["answer"]
        self.reference = self.dsl["reference"]
        self._embed_id = self.dsl.get("embed_id", "")

    # 定义了类的字符串表示形式 通常用于打印对象时显示的信息
    # 这几行代码将类的属性值赋给self.dsl字典中的相应键
    def __str__(self):
        self.dsl["path"] = self.path
        self.dsl["history"] = self.history
        self.dsl["messages"] = self.messages
        self.dsl["answer"] = self.answer
        self.dsl["reference"] = self.reference
        self.dsl["embed_id"] = self._embed_id
        # 创建一个新的字典dsl 其中包含一个键components 其值为一个空字典
        dsl = {
            "components": {}
        }
        # 遍历self.dsl字典的键 如果键不是components 则将其值深拷贝到新的dsl字典中
        for k in self.dsl.keys():
            if k in ["components"]:
                continue
            dsl[k] = deepcopy(self.dsl[k])

        # 遍历self.components字典 将每个组件的信息深拷贝到dsl字典的components键下
        # 对于obj键 将其值转换为字符串后再解析为JSON
        for k, cpn in self.components.items():
            if k not in dsl["components"]:
                dsl["components"][k] = {}
            for c in cpn.keys():
                if c == "obj":
                    dsl["components"][k][c] = json.loads(str(cpn["obj"]))
                    continue
                dsl["components"][k][c] = deepcopy(cpn[c])
        # 返回dsl字典的JSON字符串表示形式 ensure_ascii=False确保非ASCII字符可以正确显示
        return json.dumps(dsl, ensure_ascii=False)

    # 将类的属性重置为空列表
    def reset(self):
        self.path = []
        self.history = []
        self.messages = []
        self.answer = []
        self.reference = []
        # 遍历self.components字典 并调用每个组件的reset方法
        for k, cpn in self.components.items():
            self.components[k]["obj"].reset()
        # 将_embed_id属性重置为空字符串
        self._embed_id = ""

    # 这个方法用于根据组件ID获取组件名称
    def get_component_name(self, cid):
        # 遍历self.dsl字典中的graph键下的nodes列表
        # 如果找到匹配的ID 则返回相应的组件名称
        for n in self.dsl["graph"]["nodes"]:
            if cid == n["id"]:
                return n["data"]["name"]
        # 如果没有找到匹配的ID 则返回空字符串
        return ""

    def run(self, **kwargs):
        # 如果self.answer列表不为空
        if self.answer:
            # 用于存储接下来要执行的组件ID
            cpn_id = self.answer[0]
            # 存完就从列表中移除
            self.answer.pop(0)
            try:
                # 尝试执行 self.components 字典中对应 cpn_id 的组件对象的 run 方法
                # 并传入 self.history 和 **kwargs 作为参数
                ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
                # 如果执行过程中发生异常
            except Exception as e:
                # 捕获该异常并使用 ComponentBase.be_output 方法处理异常信息
                ans = ComponentBase.be_output(str(e))
            # 将执行过的组件ID添加到 self.path 列表的最后一个元素中
            # self.path 似乎用于记录执行路径
            self.path[-1].append(cpn_id)
            # 如果 kwargs 中包含关键字 “stream”
            # 则假设 ans 是一个生成器 遍历它并逐个 yield 出来
            # 如果不是 则直接 yield ans
            if kwargs.get("stream"):
                for an in ans():
                    yield an
            else:
                yield ans
            return
        # 如果 self.path 为空 则执行名为 “begin” 的组件的 run 方法 并将 “begin” 添加到 self.path 中
        if not self.path:
            self.components["begin"]["obj"].run(self.history, **kwargs)
            self.path.append(["begin"])
        # 向 self.path 添加一个空列表 为接下来的组件执行做准备
        self.path.append([])

        ran = -1
        waiting = []
        without_dependent_checking = []

        # 当有下游组件downstream时
        # 它接受一个组件列表 cpns 作为参数
        def prepare2run(cpns):
            # 这些变量在函数外部定义 但在函数内部可以被修改
            nonlocal ran, ans
            # 遍历 cpns 列表中的每个组件ID c
            # 如果 c 已经是 self.path 中最后一个列表的最后一个元素 则跳过当前循环
            for c in cpns:
                if self.path[-1] and c == self.path[-1][-1]:
                    continue
                # 从 self.components 字典中获取组件ID c 对应的组件对象
                cpn = self.components[c]["obj"]
                # 如果组件的名称是 “Answer” 则将组件ID c 添加到 self.answer 列表中
                if cpn.component_name == "Answer":
                    self.answer.append(c)
                else:
                    # 如果组件不是 “Answer” 则记录调试信息 检查组件是否有未执行的依赖组件
                    # 如果有 将组件ID添加到 waiting 列表 并跳过当前循环
                    logging.debug(f"Canvas.prepare2run: {c}")
                    if c not in without_dependent_checking:
                        cpids = cpn.get_dependent_components()
                        if any([cc not in self.path[-1] for cc in cpids]):
                            if c not in waiting:
                                waiting.append(c)
                            continue
                    yield "*'{}'* is running...🕞".format(self.get_component_name(c))
                    # 如果组件是 “Iteration” 则获取其开始组件 并检查是否已经结束 如果没有结束 则更新 cpn 和 c 为开始组件
                    if cpn.component_name.lower() == "iteration":
                        st_cpn = cpn.get_start()
                        assert st_cpn, "Start component not found for Iteration."
                        if not st_cpn["obj"].end():
                            cpn = st_cpn["obj"]
                            c = cpn._id
                    # 尝试运行组件
                    try:
                        ans = cpn.run(self.history, **kwargs)
                    # 并捕获任何异常 如果发生异常 记录错误并更新 ran 然后重新抛出异常
                    except Exception as e:
                        logging.exception(f"Canvas.run got exception: {e}")
                        self.path[-1].append(c)
                        ran += 1
                        raise e
                    # 将组件ID c 添加到 self.path 的最后一个列表中
                    self.path[-1].append(c)
            # 增加 ran 计数器的值
            ran += 1
        """
        接下来的代码块处理下游组件的运行 循环检测 以及组件输出的合并
        这部分代码涉及到更复杂的逻辑,如处理循环、条件分支、组件输出等
        """
        # 获取上一个组件的下游组件列表
        downstream = self.components[self.path[-2][-1]]["downstream"]
        # 如果没有下游组件 但存在父组件ID 则合并父组件和当前组件的输出
        if not downstream and self.components[self.path[-2][-1]].get("parent_id"):
            cid = self.path[-2][-1]
            pid = self.components[cid]["parent_id"]
            o, _ = self.components[cid]["obj"].output(allow_partial=False)
            oo, _ = self.components[pid]["obj"].output(allow_partial=False)
            self.components[pid]["obj"].set_output(pd.concat([oo, o], ignore_index=True).dropna())
            downstream = [pid]
        # 递归调用 prepare2run 函数处理下游组件 并生成运行状态信息
        for m in prepare2run(downstream):
            yield {"content": m, "running_status": True}

        # 在 ran 的值有效时 处理循环和组件运行
        # ran 的值在 0 和 self.path[-1] 的长度之间时继续执行
        while 0 <= ran < len(self.path[-1]):
            logging.debug(f"Canvas.run: {ran} {self.path}")
            # 从 self.path 中获取当前要处理的组件ID cpn_id
            # 并通过 get_component 方法获取组件对象
            cpn_id = self.path[-1][ran]
            cpn = self.get_component(cpn_id)
            # 如果当前组件没有下游组件、没有父组件ID 并且没有等待处理的组件 则退出循环
            if not any([cpn["downstream"], cpn.get("parent_id"), waiting]):
                break

            # 调用 _find_loop 方法检查是否存在循环 如果存在则抛出 OverflowError 异常
            loop = self._find_loop()
            if loop:
                raise OverflowError(f"Too much loops: {loop}")
            # 如果组件的名称是 “switch”、“categorize” 或 “relevant” 则处理这些特定类型的组件
            if cpn["obj"].component_name.lower() in ["switch", "categorize", "relevant"]:
                switch_out = cpn["obj"].output()[1].iloc[0, 0]
                assert switch_out in self.components, \
                    "{}'s output: {} not valid.".format(cpn_id, switch_out)
                # 递归调用 prepare2run 函数处理特定组件的输出 并生成运行状态信息 然后继续下一次循环
                for m in prepare2run([switch_out]):
                    yield {"content": m, "running_status": True}
                continue

            # 处理父组件输出合并
            downstream = cpn["downstream"]
            # 如果没有下游组件但有父组件ID 则合并父组件和当前组件的输出
            if not downstream and cpn.get("parent_id"):
                pid = cpn["parent_id"]
                _, o = cpn["obj"].output(allow_partial=False)
                _, oo = self.components[pid]["obj"].output(allow_partial=False)
                self.components[pid]["obj"].set_output(pd.concat([oo.dropna(axis=1), o.dropna(axis=1)], ignore_index=True))
                downstream = [pid]
            # 递归调用 prepare2run 函数处理下游组件 并生成运行状态信息
            for m in prepare2run(downstream):
                yield {"content": m, "running_status": True}

            # 如果 ran 的值等于或超过 self.path[-1] 的长度
            # 并且有等待处理的组件 则处理这些组件 并更新 ran 的值
            if ran >= len(self.path[-1]) and waiting:
                without_dependent_checking = waiting
                waiting = []
                for m in prepare2run(without_dependent_checking):
                    yield {"content": m, "running_status": True}
                without_dependent_checking = []
                ran -= 1
        # 如果 self.answer 列表非空 处理答案组件
        if self.answer:
            cpn_id = self.answer[0]
            self.answer.pop(0)
            ans = self.components[cpn_id]["obj"].run(self.history, **kwargs)
            self.path[-1].append(cpn_id)
            # 如果 self.answer 列表为空 抛出异常 提示需要在流程末尾添加交互组件
            if kwargs.get("stream"):
                assert isinstance(ans, partial)
                for an in ans():
                    yield an
            else:
                yield ans

        else:
            raise Exception("The dialog flow has no way to interact with you. Please add an 'Interact' component to the end of the flow.")

    def get_component(self, cpn_id):
        return self.components[cpn_id]

    def get_tenant_id(self):
        return self._tenant_id
    # 定义一个名为 get_history 的方法 它接受一个参数 window_size 这个参数用于确定需要返回多少条历史记录
    def get_history(self, window_size):
        # 初始化一个空列表convs  用于存储格式化后的对话记录
        convs = []
        # 使用一个 for 循环遍历 self.history 的最后 window_size 条记录
        # self.history 应该是一个列表 其中每个元素都是一个包含角色和对象(对话内容)的元组
        """
            role 是对话中的角色,例如 “user” 或 “assistant”。
            obj 是与该角色相关的对话内容,它可能是一个列表或单个对象
        """
        for role, obj in self.history[window_size * -1:]:
            if isinstance(obj, list) and obj and all([isinstance(o, dict) for o in obj]):
                convs.append({"role": role, "content": '\n'.join([str(s.get("content", "")) for s in obj])})
            else:
                convs.append({"role": role, "content": str(obj)})
        return convs

    def add_user_input(self, question):
        self.history.append(("user", question))

    def set_embedding_model(self, embed_id):
        self._embed_id = embed_id

    def get_embedding_model(self):
        return self._embed_id
 # 定义一个名为 _find_loop 的方法 它接受一个可选参数 max_loops默认值为6 这个参数用于限制循环检测的次数
    def _find_loop(self, max_loops=6):
        # 获取 self.path 列表的最后一个元素(即当前路径)并将其反转 以便从后往前检查
        path = self.path[-1][::-1]
        # 如果路径的长度小于2 则没有足够的元素形成循环 因此直接返回 False
        if len(path) < 2:
            return False
        # 遍历路径,如果找到以 “answer” 或 “iterationitem” 开头的元素
        # 则从路径中移除该元素及其后面的所有元素
        # 因为这些元素可能表示循环的结束
        for i in range(len(path)):
            if path[i].lower().find("answer") == 0 or path[i].lower().find("iterationitem") == 0:
                path = path[:i]
                break
        # 再次检查路径长度 如果移除特定元素后长度小于2 则返回 False
        if len(path) < 2:
            return False
        # 使用一个循环来检查路径的不同子序列 loc 是子序列的长度
        # 从 2 开始 直到路径长度的一半
        for loc in range(2, len(path) // 2):
            # 创建一个子序列pat 它是路径的前loc个元素 用逗号连接
            pat = ",".join(path[0:loc])
            # 创建一个字符串 path_str 它是整个路径 用逗号连接
            path_str = ",".join(path)
            # 如果子序列pat的长度大于或等于整个路径字符串 ath_str的长度 则不可能形成循环 因此返回 False
            if len(pat) >= len(path_str):
                return False
            """
            使用一个 while 循环来检查 path_str 是否以 pat 开头 
            如果是 则减少 loop 计数 并从 path_str 中移除 pat 和随后的逗号
            如果 loop 计数降到 0 以下 说明找到了循环
            """
            loop = max_loops
            while path_str.find(pat) == 0 and loop >= 0:
                loop -= 1
                if len(pat)+1 >= len(path_str):
                    return False
                path_str = path_str[len(pat)+1:]
                # 如果找到了循环 则创建一个循环模式的字符串 使用 " => " 作为分隔符
                # 并将路径中的每个元素的前半部分(在冒号之前的部分)连接起来
                # 最后 返回一个表示循环的模式字符串
            if loop < 0:
                pat = " => ".join([p.split(":")[0] for p in path[0:loc]])
                return pat + " => " + pat

        return False

    def get_prologue(self):
        return self.components["begin"]["obj"]._param.prologue

    def set_global_param(self, **kwargs):
        for k, v in kwargs.items():
            for q in self.components["begin"]["obj"]._param.query:
                if k != q["key"]:
                    continue
                q["value"] = v

    def get_preset_param(self):
        return self.components["begin"]["obj"]._param.query

    def get_component_input_elements(self, cpnnm):
        return self.components[cpnnm]["obj"].get_input_elements()